{
 "cells": [
  {
   "cell_type": "markdown",
   "metadata": {
    "colab_type": "text"
   },
   "source": [
    "This is a companion notebook for the book [Deep Learning with Python, Second Edition](https://www.manning.com/books/deep-learning-with-python-second-edition?a_aid=keras&a_bid=76564dff). For readability, it only contains runnable code blocks and section titles, and omits everything else in the book: text paragraphs, figures, and pseudocode.\n\n**If you want to be able to follow what's going on, I recommend reading the notebook side by side with your copy of the book.**\n\nThis notebook was generated for TensorFlow 2.6."
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {
    "colab_type": "text"
   },
   "source": [
    "## Beyond text classification: Sequence-to-sequence learning"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {
    "colab_type": "text"
   },
   "source": [
    "### A machine translation example"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 0,
   "metadata": {
    "colab_type": "code"
   },
   "outputs": [],
   "source": [
    "!wget http://storage.googleapis.com/download.tensorflow.org/data/spa-eng.zip\n",
    "!unzip -q spa-eng.zip"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 0,
   "metadata": {
    "colab_type": "code"
   },
   "outputs": [],
   "source": [
    "text_file = \"spa-eng/spa.txt\"\n",
    "with open(text_file) as f:\n",
    "    lines = f.read().split(\"\\n\")[:-1]\n",
    "text_pairs = []\n",
    "for line in lines:\n",
    "    english, spanish = line.split(\"\\t\")\n",
    "    spanish = \"[start] \" + spanish + \" [end]\"\n",
    "    text_pairs.append((english, spanish))"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 0,
   "metadata": {
    "colab_type": "code"
   },
   "outputs": [],
   "source": [
    "import random\n",
    "print(random.choice(text_pairs))"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 0,
   "metadata": {
    "colab_type": "code"
   },
   "outputs": [],
   "source": [
    "import random\n",
    "random.shuffle(text_pairs)\n",
    "num_val_samples = int(0.15 * len(text_pairs))\n",
    "num_train_samples = len(text_pairs) - 2 * num_val_samples\n",
    "train_pairs = text_pairs[:num_train_samples]\n",
    "val_pairs = text_pairs[num_train_samples:num_train_samples + num_val_samples]\n",
    "test_pairs = text_pairs[num_train_samples + num_val_samples:]"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {
    "colab_type": "text"
   },
   "source": [
    "**Vectorizing the English and Spanish text pairs**"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 0,
   "metadata": {
    "colab_type": "code"
   },
   "outputs": [],
   "source": [
    "import tensorflow as tf\n",
    "import string\n",
    "import re\n",
    "from tensorflow import keras\n",
    "from tensorflow.keras import layers\n",
    "\n",
    "strip_chars = string.punctuation + \"\u00bf\"\n",
    "strip_chars = strip_chars.replace(\"[\", \"\")\n",
    "strip_chars = strip_chars.replace(\"]\", \"\")\n",
    "\n",
    "def custom_standardization(input_string):\n",
    "    lowercase = tf.strings.lower(input_string)\n",
    "    return tf.strings.regex_replace(\n",
    "        lowercase, f\"[{re.escape(strip_chars)}]\", \"\")\n",
    "\n",
    "vocab_size = 15000\n",
    "sequence_length = 20\n",
    "\n",
    "source_vectorization = layers.TextVectorization(\n",
    "    max_tokens=vocab_size,\n",
    "    output_mode=\"int\",\n",
    "    output_sequence_length=sequence_length,\n",
    ")\n",
    "target_vectorization = layers.TextVectorization(\n",
    "    max_tokens=vocab_size,\n",
    "    output_mode=\"int\",\n",
    "    output_sequence_length=sequence_length + 1,\n",
    "    standardize=custom_standardization,\n",
    ")\n",
    "train_english_texts = [pair[0] for pair in train_pairs]\n",
    "train_spanish_texts = [pair[1] for pair in train_pairs]\n",
    "source_vectorization.adapt(train_english_texts)\n",
    "target_vectorization.adapt(train_spanish_texts)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {
    "colab_type": "text"
   },
   "source": [
    "**Preparing datasets for the translation task**"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 0,
   "metadata": {
    "colab_type": "code"
   },
   "outputs": [],
   "source": [
    "batch_size = 64\n",
    "\n",
    "def format_dataset(eng, spa):\n",
    "    eng = source_vectorization(eng)\n",
    "    spa = target_vectorization(spa)\n",
    "    return ({\n",
    "        \"english\": eng,\n",
    "        \"spanish\": spa[:, :-1],\n",
    "    }, spa[:, 1:])\n",
    "\n",
    "def make_dataset(pairs):\n",
    "    eng_texts, spa_texts = zip(*pairs)\n",
    "    eng_texts = list(eng_texts)\n",
    "    spa_texts = list(spa_texts)\n",
    "    dataset = tf.data.Dataset.from_tensor_slices((eng_texts, spa_texts))\n",
    "    dataset = dataset.batch(batch_size)\n",
    "    dataset = dataset.map(format_dataset, num_parallel_calls=4)\n",
    "    return dataset.shuffle(2048).prefetch(16).cache()\n",
    "\n",
    "train_ds = make_dataset(train_pairs)\n",
    "val_ds = make_dataset(val_pairs)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 0,
   "metadata": {
    "colab_type": "code"
   },
   "outputs": [],
   "source": [
    "for inputs, targets in train_ds.take(1):\n",
    "    print(f\"inputs['english'].shape: {inputs['english'].shape}\")\n",
    "    print(f\"inputs['spanish'].shape: {inputs['spanish'].shape}\")\n",
    "    print(f\"targets.shape: {targets.shape}\")"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {
    "colab_type": "text"
   },
   "source": [
    "### Sequence-to-sequence learning with RNNs"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {
    "colab_type": "text"
   },
   "source": [
    "**GRU-based encoder**"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 0,
   "metadata": {
    "colab_type": "code"
   },
   "outputs": [],
   "source": [
    "from tensorflow import keras\n",
    "from tensorflow.keras import layers\n",
    "\n",
    "embed_dim = 256\n",
    "latent_dim = 1024\n",
    "\n",
    "source = keras.Input(shape=(None,), dtype=\"int64\", name=\"english\")\n",
    "x = layers.Embedding(vocab_size, embed_dim, mask_zero=True)(source)\n",
    "encoded_source = layers.Bidirectional(\n",
    "    layers.GRU(latent_dim), merge_mode=\"sum\")(x)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {
    "colab_type": "text"
   },
   "source": [
    "**GRU-based decoder and the end-to-end model**"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 0,
   "metadata": {
    "colab_type": "code"
   },
   "outputs": [],
   "source": [
    "past_target = keras.Input(shape=(None,), dtype=\"int64\", name=\"spanish\")\n",
    "x = layers.Embedding(vocab_size, embed_dim, mask_zero=True)(past_target)\n",
    "decoder_gru = layers.GRU(latent_dim, return_sequences=True)\n",
    "x = decoder_gru(x, initial_state=encoded_source)\n",
    "x = layers.Dropout(0.5)(x)\n",
    "target_next_step = layers.Dense(vocab_size, activation=\"softmax\")(x)\n",
    "seq2seq_rnn = keras.Model([source, past_target], target_next_step)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {
    "colab_type": "text"
   },
   "source": [
    "**Training our recurrent sequence-to-sequence model**"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 0,
   "metadata": {
    "colab_type": "code"
   },
   "outputs": [],
   "source": [
    "seq2seq_rnn.compile(\n",
    "    optimizer=\"rmsprop\",\n",
    "    loss=\"sparse_categorical_crossentropy\",\n",
    "    metrics=[\"accuracy\"])\n",
    "seq2seq_rnn.fit(train_ds, epochs=15, validation_data=val_ds)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {
    "colab_type": "text"
   },
   "source": [
    "**Translating new sentences with our RNN encoder and decoder**"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 0,
   "metadata": {
    "colab_type": "code"
   },
   "outputs": [],
   "source": [
    "import numpy as np\n",
    "spa_vocab = target_vectorization.get_vocabulary()\n",
    "spa_index_lookup = dict(zip(range(len(spa_vocab)), spa_vocab))\n",
    "max_decoded_sentence_length = 20\n",
    "\n",
    "def decode_sequence(input_sentence):\n",
    "    tokenized_input_sentence = source_vectorization([input_sentence])\n",
    "    decoded_sentence = \"[start]\"\n",
    "    for i in range(max_decoded_sentence_length):\n",
    "        tokenized_target_sentence = target_vectorization([decoded_sentence])\n",
    "        next_token_predictions = seq2seq_rnn.predict(\n",
    "            [tokenized_input_sentence, tokenized_target_sentence])\n",
    "        sampled_token_index = np.argmax(next_token_predictions[0, i, :])\n",
    "        sampled_token = spa_index_lookup[sampled_token_index]\n",
    "        decoded_sentence += \" \" + sampled_token\n",
    "        if sampled_token == \"[end]\":\n",
    "            break\n",
    "    return decoded_sentence\n",
    "\n",
    "test_eng_texts = [pair[0] for pair in test_pairs]\n",
    "for _ in range(20):\n",
    "    input_sentence = random.choice(test_eng_texts)\n",
    "    print(\"-\")\n",
    "    print(input_sentence)\n",
    "    print(decode_sequence(input_sentence))"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {
    "colab_type": "text"
   },
   "source": [
    "### Sequence-to-sequence learning with Transformer"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {
    "colab_type": "text"
   },
   "source": [
    "#### The Transformer decoder"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {
    "colab_type": "text"
   },
   "source": [
    "**The `TransformerDecoder`**"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 0,
   "metadata": {
    "colab_type": "code"
   },
   "outputs": [],
   "source": [
    "class TransformerDecoder(layers.Layer):\n",
    "    def __init__(self, embed_dim, dense_dim, num_heads, **kwargs):\n",
    "        super().__init__(**kwargs)\n",
    "        self.embed_dim = embed_dim\n",
    "        self.dense_dim = dense_dim\n",
    "        self.num_heads = num_heads\n",
    "        self.attention_1 = layers.MultiHeadAttention(\n",
    "            num_heads=num_heads, key_dim=embed_dim)\n",
    "        self.attention_2 = layers.MultiHeadAttention(\n",
    "            num_heads=num_heads, key_dim=embed_dim)\n",
    "        self.dense_proj = keras.Sequential(\n",
    "            [layers.Dense(dense_dim, activation=\"relu\"),\n",
    "             layers.Dense(embed_dim),]\n",
    "        )\n",
    "        self.layernorm_1 = layers.LayerNormalization()\n",
    "        self.layernorm_2 = layers.LayerNormalization()\n",
    "        self.layernorm_3 = layers.LayerNormalization()\n",
    "        self.supports_masking = True\n",
    "\n",
    "    def get_config(self):\n",
    "        config = super().get_config()\n",
    "        config.update({\n",
    "            \"embed_dim\": self.embed_dim,\n",
    "            \"num_heads\": self.num_heads,\n",
    "            \"dense_dim\": self.dense_dim,\n",
    "        })\n",
    "        return config\n",
    "\n",
    "    def get_causal_attention_mask(self, inputs):\n",
    "        input_shape = tf.shape(inputs)\n",
    "        batch_size, sequence_length = input_shape[0], input_shape[1]\n",
    "        i = tf.range(sequence_length)[:, tf.newaxis]\n",
    "        j = tf.range(sequence_length)\n",
    "        mask = tf.cast(i >= j, dtype=\"int32\")\n",
    "        mask = tf.reshape(mask, (1, input_shape[1], input_shape[1]))\n",
    "        mult = tf.concat(\n",
    "            [tf.expand_dims(batch_size, -1),\n",
    "             tf.constant([1, 1], dtype=tf.int32)], axis=0)\n",
    "        return tf.tile(mask, mult)\n",
    "\n",
    "    def call(self, inputs, encoder_outputs, mask=None):\n",
    "        causal_mask = self.get_causal_attention_mask(inputs)\n",
    "        if mask is not None:\n",
    "            padding_mask = tf.cast(\n",
    "                mask[:, tf.newaxis, :], dtype=\"int32\")\n",
    "            padding_mask = tf.minimum(padding_mask, causal_mask)\n",
    "        else:\n",
    "            padding_mask = mask\n",
    "        attention_output_1 = self.attention_1(\n",
    "            query=inputs,\n",
    "            value=inputs,\n",
    "            key=inputs,\n",
    "            attention_mask=causal_mask)\n",
    "        attention_output_1 = self.layernorm_1(inputs + attention_output_1)\n",
    "        attention_output_2 = self.attention_2(\n",
    "            query=attention_output_1,\n",
    "            value=encoder_outputs,\n",
    "            key=encoder_outputs,\n",
    "            attention_mask=padding_mask,\n",
    "        )\n",
    "        attention_output_2 = self.layernorm_2(\n",
    "            attention_output_1 + attention_output_2)\n",
    "        proj_output = self.dense_proj(attention_output_2)\n",
    "        return self.layernorm_3(attention_output_2 + proj_output)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {
    "colab_type": "text"
   },
   "source": [
    "#### Putting it all together: A Transformer for machine translation"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {
    "colab_type": "text"
   },
   "source": [
    "**PositionalEmbedding layer**"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 0,
   "metadata": {
    "colab_type": "code"
   },
   "outputs": [],
   "source": [
    "class PositionalEmbedding(layers.Layer):\n",
    "    def __init__(self, sequence_length, input_dim, output_dim, **kwargs):\n",
    "        super().__init__(**kwargs)\n",
    "        self.token_embeddings = layers.Embedding(\n",
    "            input_dim=input_dim, output_dim=output_dim)\n",
    "        self.position_embeddings = layers.Embedding(\n",
    "            input_dim=sequence_length, output_dim=output_dim)\n",
    "        self.sequence_length = sequence_length\n",
    "        self.input_dim = input_dim\n",
    "        self.output_dim = output_dim\n",
    "\n",
    "    def call(self, inputs):\n",
    "        length = tf.shape(inputs)[-1]\n",
    "        positions = tf.range(start=0, limit=length, delta=1)\n",
    "        embedded_tokens = self.token_embeddings(inputs)\n",
    "        embedded_positions = self.position_embeddings(positions)\n",
    "        return embedded_tokens + embedded_positions\n",
    "\n",
    "    def compute_mask(self, inputs, mask=None):\n",
    "        return tf.math.not_equal(inputs, 0)\n",
    "\n",
    "    def get_config(self):\n",
    "        config = super(PositionalEmbedding, self).get_config()\n",
    "        config.update({\n",
    "            \"output_dim\": self.output_dim,\n",
    "            \"sequence_length\": self.sequence_length,\n",
    "            \"input_dim\": self.input_dim,\n",
    "        })\n",
    "        return config"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {
    "colab_type": "text"
   },
   "source": [
    "**End-to-end Transformer**"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 0,
   "metadata": {
    "colab_type": "code"
   },
   "outputs": [],
   "source": [
    "embed_dim = 256\n",
    "dense_dim = 2048\n",
    "num_heads = 8\n",
    "\n",
    "encoder_inputs = keras.Input(shape=(None,), dtype=\"int64\", name=\"english\")\n",
    "x = PositionalEmbedding(sequence_length, vocab_size, embed_dim)(encoder_inputs)\n",
    "encoder_outputs = TransformerEncoder(embed_dim, dense_dim, num_heads)(x)\n",
    "\n",
    "decoder_inputs = keras.Input(shape=(None,), dtype=\"int64\", name=\"spanish\")\n",
    "x = PositionalEmbedding(sequence_length, vocab_size, embed_dim)(decoder_inputs)\n",
    "x = TransformerDecoder(embed_dim, dense_dim, num_heads)(x, encoder_outputs)\n",
    "x = layers.Dropout(0.5)(x)\n",
    "decoder_outputs = layers.Dense(vocab_size, activation=\"softmax\")(x)\n",
    "transformer = keras.Model([encoder_inputs, decoder_inputs], decoder_outputs)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {
    "colab_type": "text"
   },
   "source": [
    "**Training the sequence-to-sequence Transformer**"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 0,
   "metadata": {
    "colab_type": "code"
   },
   "outputs": [],
   "source": [
    "transformer.compile(\n",
    "    optimizer=\"rmsprop\",\n",
    "    loss=\"sparse_categorical_crossentropy\",\n",
    "    metrics=[\"accuracy\"])\n",
    "transformer.fit(train_ds, epochs=30, validation_data=val_ds)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {
    "colab_type": "text"
   },
   "source": [
    "**Translating new sentences with our Transformer model**"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 0,
   "metadata": {
    "colab_type": "code"
   },
   "outputs": [],
   "source": [
    "import numpy as np\n",
    "spa_vocab = target_vectorization.get_vocabulary()\n",
    "spa_index_lookup = dict(zip(range(len(spa_vocab)), spa_vocab))\n",
    "max_decoded_sentence_length = 20\n",
    "\n",
    "def decode_sequence(input_sentence):\n",
    "    tokenized_input_sentence = source_vectorization([input_sentence])\n",
    "    decoded_sentence = \"[start]\"\n",
    "    for i in range(max_decoded_sentence_length):\n",
    "        tokenized_target_sentence = target_vectorization(\n",
    "            [decoded_sentence])[:, :-1]\n",
    "        predictions = transformer(\n",
    "            [tokenized_input_sentence, tokenized_target_sentence])\n",
    "        sampled_token_index = np.argmax(predictions[0, i, :])\n",
    "        sampled_token = spa_index_lookup[sampled_token_index]\n",
    "        decoded_sentence += \" \" + sampled_token\n",
    "        if sampled_token == \"[end]\":\n",
    "            break\n",
    "    return decoded_sentence\n",
    "\n",
    "test_eng_texts = [pair[0] for pair in test_pairs]\n",
    "for _ in range(20):\n",
    "    input_sentence = random.choice(test_eng_texts)\n",
    "    print(\"-\")\n",
    "    print(input_sentence)\n",
    "    print(decode_sequence(input_sentence))"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {
    "colab_type": "text"
   },
   "source": [
    "## Summary"
   ]
  }
 ],
 "metadata": {
  "colab": {
   "collapsed_sections": [],
   "name": "chapter11_part04_sequence-to-sequence-learning.i",
   "private_outputs": false,
   "provenance": [],
   "toc_visible": true
  },
  "kernelspec": {
   "display_name": "Python 3",
   "language": "python",
   "name": "python3"
  },
  "language_info": {
   "codemirror_mode": {
    "name": "ipython",
    "version": 3
   },
   "file_extension": ".py",
   "mimetype": "text/x-python",
   "name": "python",
   "nbconvert_exporter": "python",
   "pygments_lexer": "ipython3",
   "version": "3.7.0"
  }
 },
 "nbformat": 4,
 "nbformat_minor": 0
}
